{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using MLRUN with Dask Distributed Jobs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# recommended, installing the exact package versions as we use in the worker\n",
    "#!pip install dask==2.12.0 distributed==2.14.0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Writing a function code"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# function that will be distributed\n",
    "def inc(x):\n",
    "    return x + 2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The MLRun context in the case of Dask will have an extra param `dask_client`\n",
    "which is initialized based on the function spec (below), and can be used to submit Dask commands."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# wrapper function, uses the dask client object\n",
    "def hndlr(context, x=1, y=2):\n",
    "    context.logger.info(\"params: x={},y={}\".format(x, y))\n",
    "    print(\"params: x={},y={}\".format(x, y))\n",
    "    x = context.dask_client.submit(inc, x)\n",
    "    print(x)\n",
    "    print(x.result())\n",
    "    context.log_result(\"y\", x.result())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# mlrun: end-code\n",
    "# marks the end of a code section, do not delete"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mlrun import new_function, mlconf, code_to_function, mount_v3io, new_task\n",
    "\n",
    "# mlconf.dbpath = 'http://mlrun-api:8080'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define a Dask function object\n",
    "dask functions can be local (local workers), or remote (use containers in the cluster),\n",
    "in the case of `remote` users can specify the number of replica (optional) or leave blank for auto-scale.\n",
    "\n",
    "We use `code_to_function()` which packs the function code into the function object/yaml (eliminate the need to update the function image), we can use `new_function()` if the function code is part of the image or can be remotely mounted (e.g. via v3io mount).\n",
    "\n",
    "Dask function spec have several unique attributes:\n",
    "* **.remote** - bool, use local or clustered dask\n",
    "* **.replicas** - number of desired replicas, keep 0 for auto-scale\n",
    "* **.min_replicas, .max_replicas** - set replicas range for auto-scale\n",
    "* **.scheduler_timeout** - cluster will be killed after timeout (inactivity), default is `'60 minutes'`\n",
    "* **.nthreads** - number of worker threads\n",
    "* **.kfp_image** - optional, container image to use by KFP Pipeline runner (default to mlrun/dask)\n",
    "\n",
    "If you want to access the dask dashboard or scheduler from remote you need to use `NodePort` service type (set **.service_type** to 'NodePort'), and the external IP need to be specified in mlrun configuration (`mlconf.remote_host`), this will be set automatically if you are running on an Iguazio cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create the function from the notebook code + annotations, add volumes\n",
    "dsf = code_to_function(\"mydask\", kind=\"dask\").apply(mount_v3io())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "dsf.spec.image = \"mlrun/ml-models\"\n",
    "dsf.spec.remote = True\n",
    "dsf.spec.replicas = 1\n",
    "dsf.spec.service_type = \"NodePort\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build the function with extra packages\n",
    "We can skip the build section if we don't add packages (instead need to specify the image e.g. `dsf.spec.image='mlrun/ml-models'` which contains most of the packages you may need) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "# uncomment if you want to add packages to the workers and build a new image\n",
    "# dsf.build_config(base_image='mlrun/ml-models', commands=['pip install pandas'])\n",
    "# dsf.deploy()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run a task using our distributed dask function (cluster)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[mlrun] 2020-05-05 13:50:40,588 artifact path is not defined or is local, artifacts will not be visible in the UI\n",
      "[mlrun] 2020-05-05 13:50:40,598 starting run mydask2-hndlr uid=ebf42071e5a94cf0a5a79f701c43c096  -> http://10.199.227.162:8080\n",
      "[mlrun] 2020-05-05 13:50:45,217 trying dask client at: tcp://mlrun-mydask2-dfcb98f6-9.default-tenant:8786\n",
      "[mlrun] 2020-05-05 13:50:45,235 using remote dask scheduler (mlrun-mydask2-dfcb98f6-9) at: tcp://mlrun-mydask2-dfcb98f6-9.default-tenant:8786\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a href=\"http://default-tenant.app.yh55.iguazio-cd2.com:31164/status\" target=\"_blank\" >dashboard link: default-tenant.app.yh55.iguazio-cd2.com:31164</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[mlrun] 2020-05-05 13:50:45,243 params: x=12,y=2\n",
      "params: x=12,y=2\n",
      "<Future: pending, key: inc-d94d5c09f5ea9337e23aad080db4545d>\n",
      "14\n",
      "\n",
      "[mlrun] 2020-05-05 13:50:48,646 run ended with state \n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<style> \n",
       ".dictlist {\n",
       "  background-color: #b3edff; \n",
       "  text-align: center; \n",
       "  margin: 4px; \n",
       "  border-radius: 3px; padding: 0px 3px 1px 3px; display: inline-block;}\n",
       ".artifact {\n",
       "  cursor: pointer; \n",
       "  background-color: #ffe6cc; \n",
       "  text-align: left; \n",
       "  margin: 4px; border-radius: 3px; padding: 0px 3px 1px 3px; display: inline-block;\n",
       "}\n",
       "div.block.hidden {\n",
       "  display: none;\n",
       "}\n",
       ".clickable {\n",
       "  cursor: pointer;\n",
       "}\n",
       ".ellipsis {\n",
       "  display: inline-block;\n",
       "  max-width: 60px;\n",
       "  white-space: nowrap;\n",
       "  overflow: hidden;\n",
       "  text-overflow: ellipsis;\n",
       "}\n",
       ".master-wrapper {\n",
       "  display: flex;\n",
       "  flex-flow: row nowrap;\n",
       "  justify-content: flex-start;\n",
       "  align-items: stretch;\n",
       "}\n",
       ".master-tbl {\n",
       "  flex: 3\n",
       "}\n",
       ".master-wrapper > div {\n",
       "  margin: 4px;\n",
       "  padding: 10px;\n",
       "}\n",
       "iframe.fileview {\n",
       "  border: 0 none;\n",
       "  height: 100%;\n",
       "  width: 100%;\n",
       "  white-space: pre-wrap;\n",
       "}\n",
       ".pane-header-title {\n",
       "  width: 80%;\n",
       "  font-weight: 500;\n",
       "}\n",
       ".pane-header {\n",
       "  line-height: 1;\n",
       "  background-color: #ffe6cc;\n",
       "  padding: 3px;\n",
       "}\n",
       ".pane-header .close {\n",
       "  font-size: 20px;\n",
       "  font-weight: 700;\n",
       "  float: right;\n",
       "  margin-top: -5px;\n",
       "}\n",
       ".master-wrapper .right-pane {\n",
       "  border: 1px inset silver;\n",
       "  width: 40%;\n",
       "  min-height: 300px;\n",
       "  flex: 3\n",
       "  min-width: 500px;\n",
       "}\n",
       ".master-wrapper * {\n",
       "  box-sizing: border-box;\n",
       "}\n",
       "</style><script>\n",
       "function copyToClipboard(fld) {\n",
       "    if (document.queryCommandSupported && document.queryCommandSupported('copy')) {\n",
       "        var textarea = document.createElement('textarea');\n",
       "        textarea.textContent = fld.innerHTML;\n",
       "        textarea.style.position = 'fixed';\n",
       "        document.body.appendChild(textarea);\n",
       "        textarea.select();\n",
       "\n",
       "        try {\n",
       "            return document.execCommand('copy'); // Security exception may be thrown by some browsers.\n",
       "        } catch (ex) {\n",
       "\n",
       "        } finally {\n",
       "            document.body.removeChild(textarea);\n",
       "        }\n",
       "    }\n",
       "}\n",
       "function expandPanel(el) {\n",
       "  const panelName = \"#\" + el.getAttribute('paneName');\n",
       "  console.log(el.title);\n",
       "\n",
       "  document.querySelector(panelName + \"-title\").innerHTML = el.title\n",
       "  iframe = document.querySelector(panelName + \"-body\");\n",
       "  \n",
       "  const tblcss = `<style> body { font-family: Arial, Helvetica, sans-serif;}\n",
       "    #csv { margin-bottom: 15px; }\n",
       "    #csv table { border-collapse: collapse;}\n",
       "    #csv table td { padding: 4px 8px; border: 1px solid silver;} </style>`;\n",
       "\n",
       "  function csvToHtmlTable(str) {\n",
       "    return '<div id=\"csv\"><table><tr><td>' +  str.replace(/[\\n\\r]+$/g, '').replace(/[\\n\\r]+/g, '</td></tr><tr><td>')\n",
       "      .replace(/,/g, '</td><td>') + '</td></tr></table></div>';\n",
       "  }\n",
       "  \n",
       "  function reqListener () {\n",
       "    if (el.title.endsWith(\".csv\")) {\n",
       "      iframe.setAttribute(\"srcdoc\", tblcss + csvToHtmlTable(this.responseText));\n",
       "    } else {\n",
       "      iframe.setAttribute(\"srcdoc\", this.responseText);\n",
       "    }  \n",
       "    console.log(this.responseText);\n",
       "  }\n",
       "\n",
       "  const oReq = new XMLHttpRequest();\n",
       "  oReq.addEventListener(\"load\", reqListener);\n",
       "  oReq.open(\"GET\", el.title);\n",
       "  oReq.send();\n",
       "  \n",
       "  \n",
       "  //iframe.src = el.title;\n",
       "  const resultPane = document.querySelector(panelName + \"-pane\");\n",
       "  if (resultPane.classList.contains(\"hidden\")) {\n",
       "    resultPane.classList.remove(\"hidden\");\n",
       "  }\n",
       "}\n",
       "function closePanel(el) {\n",
       "  const panelName = \"#\" + el.getAttribute('paneName')\n",
       "  const resultPane = document.querySelector(panelName + \"-pane\");\n",
       "  if (!resultPane.classList.contains(\"hidden\")) {\n",
       "    resultPane.classList.add(\"hidden\");\n",
       "  }\n",
       "}\n",
       "\n",
       "</script>\n",
       "<div class=\"master-wrapper\">\n",
       "  <div class=\"block master-tbl\"><div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th>project</th>\n",
       "      <th>uid</th>\n",
       "      <th>iter</th>\n",
       "      <th>start</th>\n",
       "      <th>state</th>\n",
       "      <th>name</th>\n",
       "      <th>labels</th>\n",
       "      <th>inputs</th>\n",
       "      <th>parameters</th>\n",
       "      <th>results</th>\n",
       "      <th>artifacts</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <td></td>\n",
       "      <td><div title=\"ebf42071e5a94cf0a5a79f701c43c096\"><a href=\"https://mlrun-ui.default-tenant.app.yh55.iguazio-cd2.com/projects//jobs/ebf42071e5a94cf0a5a79f701c43c096/info\" target=\"_blank\" >...1c43c096</a></div></td>\n",
       "      <td>0</td>\n",
       "      <td>May 05 13:50:40</td>\n",
       "      <td>completed</td>\n",
       "      <td>mydask2-hndlr</td>\n",
       "      <td><div class=\"dictlist\">v3io_user=admin</div><div class=\"dictlist\">kind=dask</div><div class=\"dictlist\">owner=admin</div><div class=\"dictlist\">host=jupyter-65887d7ffb-5jsn2</div></td>\n",
       "      <td></td>\n",
       "      <td><div class=\"dictlist\">x=12</div></td>\n",
       "      <td><div class=\"dictlist\">y=14</div></td>\n",
       "      <td></td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div></div>\n",
       "  <div id=\"result5b5e68a6-pane\" class=\"right-pane block hidden\">\n",
       "    <div class=\"pane-header\">\n",
       "      <span id=\"result5b5e68a6-title\" class=\"pane-header-title\">Title</span>\n",
       "      <span onclick=\"closePanel(this)\" paneName=\"result5b5e68a6\" class=\"close clickable\">&times;</span>\n",
       "    </div>\n",
       "    <iframe class=\"fileview\" id=\"result5b5e68a6-body\"></iframe>\n",
       "  </div>\n",
       "</div>\n"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "to track results use .show() or .logs() or in CLI: \n",
      "!mlrun get run ebf42071e5a94cf0a5a79f701c43c096  , !mlrun logs ebf42071e5a94cf0a5a79f701c43c096 \n",
      "[mlrun] 2020-05-05 13:50:48,671 run executed, status=completed\n"
     ]
    }
   ],
   "source": [
    "myrun = dsf.run(handler=hndlr, params={\"x\": 12})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'scheduler_address': 'tcp://mlrun-mydask2-dfcb98f6-9.default-tenant:8786',\n",
       " 'cluster_name': 'mlrun-mydask2-dfcb98f6-9',\n",
       " 'node_ports': {'dashboard': 31164, 'scheduler': 32718}}"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# get the function status and addresses\n",
    "dsf.status.to_dict()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Accessing the dask client directly\n",
    "You can get the dask client object and cluster information by reading the client object.\n",
    "\n",
    "> Note: the cluster can timeout, when you call the client MLRun will also verify the cluster is live and active and if not it will restart the dask cluster and refresh the function object with the latest addresses and status."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[mlrun] 2020-05-05 13:51:32,710 trying dask client at: tcp://mlrun-mydask2-dfcb98f6-9.default-tenant:8786\n",
      "[mlrun] 2020-05-05 13:51:32,719 using remote dask scheduler (mlrun-mydask2-dfcb98f6-9) at: tcp://mlrun-mydask2-dfcb98f6-9.default-tenant:8786\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/conda/lib/python3.6/site-packages/distributed/client.py:1079: VersionMismatchWarning: Mismatched versions found\n",
      "\n",
      "python\n",
      "+-------------------------+---------------+\n",
      "|                         | version       |\n",
      "+-------------------------+---------------+\n",
      "| client                  | 3.6.8.final.0 |\n",
      "| scheduler               | 3.7.6.final.0 |\n",
      "| tcp://10.200.0.57:36075 | 3.7.6.final.0 |\n",
      "+-------------------------+---------------+\n",
      "  warnings.warn(version_module.VersionMismatchWarning(msg[0][\"warning\"]))\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a href=\"http://default-tenant.app.yh55.iguazio-cd2.com:31164/status\" target=\"_blank\" >dashboard link: default-tenant.app.yh55.iguazio-cd2.com:31164</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "c = dsf.client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Access a dask function using the DB\n",
    "If we want to access the dask function (or its cluster), we can load the function object from the DB (assuming we already .run() or .save() it).\n",
    "\n",
    "This can be useful if we want to load the same function in a different notebook or container, or if we restarted our notebook"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[mlrun] 2020-05-05 13:53:09,000 trying dask client at: tcp://mlrun-mydask2-dfcb98f6-9.default-tenant:8786\n",
      "[mlrun] 2020-05-05 13:53:09,007 using remote dask scheduler (mlrun-mydask2-dfcb98f6-9) at: tcp://mlrun-mydask2-dfcb98f6-9.default-tenant:8786\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a href=\"http://default-tenant.app.yh55.iguazio-cd2.com:31164/status\" target=\"_blank\" >dashboard link: default-tenant.app.yh55.iguazio-cd2.com:31164</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from mlrun import import_function\n",
    "\n",
    "# Functions url: db://<project>/<name>[:tag]\n",
    "dsf_obj = import_function(\"db://default/mydask\")\n",
    "c = dsf_obj.client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Building a Pipeline using dask functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp\n",
    "from kfp import dsl\n",
    "from mlrun import run_pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dsl.pipeline(name=\"dask_pipeline\")\n",
    "def dask_pipe(x=1, y=10):\n",
    "    # use_db option will use a function (DB) pointer instead of adding the function spec to the YAML\n",
    "    myrun = dsf.as_step(\n",
    "        new_task(handler=hndlr, name=\"dask_pipeline\", params={\"x\": x, \"y\": y}),\n",
    "        use_db=True,\n",
    "    )\n",
    "\n",
    "    # if the step (dask client) need v3io access u should add: .apply(mount_v3io())\n",
    "\n",
    "    # if its a new image we may want to tell Kubeflow to reload the image\n",
    "    # myrun.container.set_image_pull_policy('Always')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/conda/lib/python3.6/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as \"Integer\" based on the value \"1\".\n",
      "  warnings.warn('Missing type name was inferred as \"{}\" based on the value \"{}\".'.format(type_name, str(value)))\n",
      "/conda/lib/python3.6/site-packages/kfp/components/_data_passing.py:168: UserWarning: Missing type name was inferred as \"Integer\" based on the value \"10\".\n",
      "  warnings.warn('Missing type name was inferred as \"{}\" based on the value \"{}\".'.format(type_name, str(value)))\n"
     ]
    }
   ],
   "source": [
    "# for pipeline debug\n",
    "kfp.compiler.Compiler().compile(dask_pipe, \"daskpipe.yaml\", type_check=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "Experiment link <a href=\"https://dashboard.default-tenant.app.yh55.iguazio-cd2.com/pipelines/#/experiments/details/b1353b31-1116-460a-abec-c3eb2c53697f\" target=\"_blank\" >here</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "Run link <a href=\"https://dashboard.default-tenant.app.yh55.iguazio-cd2.com/pipelines/#/runs/details/8b19aee4-84cd-4960-8b24-1324c57612d5\" target=\"_blank\" >here</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[mlrun] 2020-05-05 13:53:55,047 Pipeline run id=8b19aee4-84cd-4960-8b24-1324c57612d5, check UI or DB for progress\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "'8b19aee4-84cd-4960-8b24-1324c57612d5'"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "arguments = {\"x\": 4, \"y\": -5}\n",
    "artifact_path = \"/User/test\"\n",
    "run_id = run_pipeline(\n",
    "    dask_pipe,\n",
    "    arguments,\n",
    "    artifact_path=artifact_path,\n",
    "    run=\"DaskExamplePipeline\",\n",
    "    experiment=\"dask pipe\",\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mlrun import wait_for_pipeline_completion, get_run_db\n",
    "\n",
    "wait_for_pipeline_completion(run_id)\n",
    "db = get_run_db().connect()\n",
    "db.list_runs(project=\"default\", labels=f\"workflow={run_id}\").show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
